/* Copyright (C) 2014 InfiniDB, Inc.

   This program is free software; you can redistribute it and/or
   modify it under the terms of the GNU General Public License
   as published by the Free Software Foundation; version 2 of
   the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
   MA 02110-1301, USA. */

//
// $Id: batchprimitiveprocessor-jl.h 9705 2013-07-17 20:06:07Z pleblanc $
// C++ Interface: batchprimitiveprocessor
//
// Description:
//
//
// Author: Patrick LeBlanc <pleblanc@calpont.com>, (C) 2008
//
// Copyright: See COPYING file that comes with this distribution
//
//
/** @file */

#pragma once

#include <boost/scoped_array.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/uuid/uuid.hpp>

#include "mcs_basic_types.h"
#include "primitivemsg.h"
#include "serializeable.h"
#include "primitivestep.h"
#include "brm.h"
#include "command-jl.h"
#include "resourcemanager.h"
//#include "tableband.h"

namespace joblist
{
const uint32_t LOGICAL_BLOCKS_CONVERTER = 23;  // 10 + 13.  13 to convert to logical blocks,
// 10 to convert to groups of 1024 logical blocks
const uint8_t CP_FLAG_AND_LBID = 9;  // # bytes used for CP boolean and the lbid
                                     // used by BatchPrimitiveProcessorJL::countThisMsg()

// forward reference
struct JobInfo;

class BatchPrimitiveProcessorJL
{
 public:
  /* Constructor used by the JobStep */
  explicit BatchPrimitiveProcessorJL(const ResourceManager* rm);
  ~BatchPrimitiveProcessorJL();

  /* Interface used by the JobStep */

  /* Some accessors */
  inline bool hasScan()
  {
    return _hasScan;
  }

  /* For initializing the object */
  inline void setSessionID(uint32_t num)
  {
    sessionID = num;
  }
  inline void setStepID(uint32_t num)
  {
    stepID = num;
  }
  inline void setUniqueID(uint32_t id)
  {
    uniqueID = id;
  }
  inline void setQueryContext(const BRM::QueryContext& qc)
  {
    versionInfo = qc;
  }
  inline void setTxnID(uint32_t num)
  {
    txnID = num;
  }
  inline void setOutputType(BPSOutputType o)
  {
    ot = o;

    if (ot == TUPLE || ot == ROW_GROUP)
      needRidsAtDelivery = true;
  }
  inline void setNeedRidsAtDelivery(bool b)
  {
    needRidsAtDelivery = b;
  }
  inline void setTraceFlags(uint32_t flags)
  {
    LBIDTrace = ((flags & execplan::CalpontSelectExecutionPlan::TRACE_LBIDS) != 0);
  }
  inline uint32_t getRidCount()
  {
    return ridCount;
  }
  inline void setThreadCount(uint32_t tc)
  {
    threadCount = tc;
  }

  void addFilterStep(const pColScanStep&, std::vector<BRM::LBID_t> lastScannedLBID,
                     bool hasAuxCol, const std::vector<BRM::EMEntry>& extentsAux,
                     execplan::CalpontSystemCatalog::OID oidAux);
  void addFilterStep(const PseudoColStep&);
  void addFilterStep(const pColStep&);
  void addFilterStep(const pDictionaryStep&);
  void addFilterStep(const FilterStep&);
  void addProjectStep(const PseudoColStep&);
  void addProjectStep(const pColStep&);
  void addProjectStep(const PassThruStep&);
  void addProjectStep(const pColStep&, const pDictionaryStep&);
  void addProjectStep(const PassThruStep&, const pDictionaryStep&);

  void createBPP(messageqcpp::ByteStream&) const;
  void destroyBPP(messageqcpp::ByteStream&) const;

  /* Call this one last */
  // void addDeliveryStep(const DeliveryStep &);

  /* At runtime, feed input here */
  void addElementType(const ElementType&, uint32_t dbroot);
  void addElementType(const StringElementType&, uint32_t dbroot);
  // void setRowGroupData(const rowgroup::RowGroup &);

  void runBPP(messageqcpp::ByteStream&, uint32_t pmNum, bool isExeMgrDEC);
  void abortProcessing(messageqcpp::ByteStream*);

  /* After serializing a BPP object, reset it and it's ready for more input */
  void reset();

  /* The JobStep calls these to initialize a BPP that starts with a column scan */
  void setLBID(uint64_t lbid, const BRM::EMEntry& scannedExtent);
  inline void setCount(uint16_t c)
  {
    idbassert(c > 0);
    count = c;
  }

  /* Turn a ByteStream into ElementTypes or StringElementTypes */
  void getElementTypes(messageqcpp::ByteStream& in, std::vector<ElementType>* out, bool* validCPData,
                       uint64_t* lbid, int64_t* min, int64_t* max, uint32_t* cachedIO, uint32_t* physIO,
                       uint32_t* touchedBlocks) const;
  void getStringElementTypes(messageqcpp::ByteStream& in, std::vector<StringElementType>* out,
                             bool* validCPData, uint64_t* lbid, int64_t* min, int64_t* max,
                             uint32_t* cachedIO, uint32_t* physIO, uint32_t* touchedBlocks) const;
  /* (returns the row count) */
  //	uint32_t getTableBand(messageqcpp::ByteStream &in, messageqcpp::ByteStream *out,
  //		bool *validCPData, uint64_t *lbid, int64_t *min, int64_t *max,
  //		uint32_t *cachedIO, uint32_t *physIO, uint32_t *touchedBlocks) const;
  void getTuples(messageqcpp::ByteStream& in, std::vector<TupleType>* out, bool* validCPData, uint64_t* lbid,
                 int64_t* min, int64_t* max, uint32_t* cachedIO, uint32_t* physIO,
                 uint32_t* touchedBlocks) const;
  void deserializeAggregateResults(messageqcpp::ByteStream* in, std::vector<rowgroup::RGData>* out) const;
  void getRowGroupData(messageqcpp::ByteStream& in, std::vector<rowgroup::RGData>* out, bool* validCPData,
                       uint64_t* lbid, bool* fromDictScan, int128_t* min, int128_t* max, uint32_t* cachedIO,
                       uint32_t* physIO, uint32_t* touchedBlocks, bool* countThis, uint32_t threadID,
                       bool* hasBinaryColumn, const execplan::CalpontSystemCatalog::ColType& colType) const;
  void deserializeAggregateResult(messageqcpp::ByteStream* in, std::vector<rowgroup::RGData>* out) const;
  bool countThisMsg(messageqcpp::ByteStream& in) const;

  void setStatus(uint16_t s)
  {
    status = s;
  }
  uint16_t getStatus() const
  {
    return status;
  }
  void runErrorBPP(messageqcpp::ByteStream&);

  rowgroup::RGData getErrorRowGroupData(uint16_t error) const;

  // @bug 1098
  std::vector<SCommand>& getFilterSteps()
  {
    return filterSteps;
  }
  std::vector<SCommand>& getProjectSteps()
  {
    return projectSteps;
  }

  std::string toString() const;

  /* RowGroup additions */
  void setProjectionRowGroup(const rowgroup::RowGroup& rg);
  void setInputRowGroup(const rowgroup::RowGroup& rg);

  /* Aggregation */
  void addAggregateStep(const rowgroup::SP_ROWAGG_PM_t&, const rowgroup::RowGroup&);
  void setJoinedRowGroup(const rowgroup::RowGroup& rg);

  /* Tuple hashjoin */
  void useJoiners(const std::vector<std::shared_ptr<joiner::TupleJoiner> >&);
  bool nextTupleJoinerMsg(messageqcpp::ByteStream&);
  // 	void setSmallSideKeyColumn(uint32_t col);

  /* OR hacks */
  void setBOP(uint32_t op);  // BOP_AND or BOP_OR, default is BOP_AND
  void setForHJ(bool b);     // default is false

  /* self-join */
  void jobInfo(const JobInfo* jobInfo)
  {
    fJobInfo = jobInfo;
  }

  /* Function & Expression support */
  void setFEGroup1(boost::shared_ptr<funcexp::FuncExpWrapper>, const rowgroup::RowGroup& input);
  void setFEGroup2(boost::shared_ptr<funcexp::FuncExpWrapper>, const rowgroup::RowGroup& output);
  void setJoinFERG(const rowgroup::RowGroup& rg);

  /* This fcn determines whether or not the containing TBPS obj will process results
  from a join or put the RG data right in the output datalist. */
  bool pmSendsFinalResult() const
  {
    // isn't aware of UM-only joins.  Function name is a bit misleading.
    return (tJoiners.size() == 0 || (PMJoinerCount > 0 && (fe2 || aggregatorPM)));
  }
  const std::string toMiniString() const;

  void priority(uint32_t p)
  {
    _priority = p;
  };
  uint32_t priority()
  {
    return _priority;
  }

  void deliverStringTableRowGroup(bool b);

  void setUuid(const boost::uuids::uuid& u)
  {
    uuid = u;
  }

  void setMaxPmJoinResultCount(uint32_t count)
  {
    maxPmJoinResultCount = count;
  }

 private:
  const size_t perColumnProjectWeight_ = 10;
  const size_t perColumnFilteringWeight_ = 10;
  const size_t fe1Weight_ = 10;
  const size_t fe2Weight_ = 10;
  const size_t joinWeight_ = 500;
  const size_t aggregationWeight_ = 500;

  // This is simple SQL operations-based model leveraged by
  // FairThreadPool run by PP facility.
  // Every operation mentioned in this calculation spends
  // some CPU so the morsel uses this op weights more.
  uint32_t calculateBPPWeight() const
  {
    uint32_t weight = perColumnProjectWeight_ * projectCount;
    weight += filterCount * perColumnFilteringWeight_;
    weight += tJoiners.size() * joinWeight_;
    weight += (aggregatorPM) ? aggregationWeight_ : 0;
    weight += (fe1) ? fe1Weight_ : 0;
    weight += (fe2) ? fe2Weight_ : 0;
    return weight;
  }
  BPSOutputType ot;

  bool needToSetLBID;

  BRM::QueryContext versionInfo;
  uint32_t txnID;
  uint32_t sessionID;
  uint32_t stepID;
  uint32_t uniqueID;

  // # of times to loop over the command arrays
  // ...  This is 1, except when the first command is a scan, in which case
  // this single BPP object produces count responses.
  uint16_t count;

  /* XXXPAT: tradeoff here.  Memory wasted by statically allocating all of these
      arrays on the UM (most aren't used) vs more dynamic allocation
      on the PM */

  uint64_t baseRid;  // first abs RID of the logical block

  uint16_t relRids[LOGICAL_BLOCK_RIDS];
  boost::scoped_array<uint64_t> absRids;
  // TODO MCOL-641 Do we need uint128_t buffers here?
  // When would sendValues=true, in which case values[]
  // is sent to primproc?
  uint64_t values[LOGICAL_BLOCK_RIDS];
  uint16_t ridCount;
  bool needStrValues;

  uint16_t wideColumnsWidths;

  std::vector<SCommand> filterSteps;
  std::vector<SCommand> projectSteps;
  //@bug 1136
  uint16_t filterCount;
  uint16_t projectCount;
  bool needRidsAtDelivery;
  uint16_t ridMap;

  //	TableBand templateTB;
  uint32_t tableOID;
  boost::scoped_array<int> tablePositions;
  uint32_t tableColumnCount;
  bool sendValues;
  bool sendAbsRids;
  bool _hasScan;
  bool LBIDTrace;

  /* for tuple return type */
  std::vector<uint16_t> colWidths;
  uint32_t tupleLength;
  // 		uint32_t rowCounter;    // for debugging
  // 		uint32_t rowsProcessed;
  uint16_t status;

  /* for Joiner serialization */
  bool pickNextJoinerNum();
  uint32_t pos, joinerNum;
  boost::shared_ptr<std::vector<ElementType> > smallSide;
  boost::scoped_array<uint32_t> posByJoinerNum;

  /* for RowGroup return type */
  rowgroup::RowGroup inputRG, projectionRG;
  uint32_t valueColumn;

  /* for PM Aggregation */
  rowgroup::RowGroup joinedRG;
  rowgroup::SP_ROWAGG_PM_t aggregatorPM;
  rowgroup::RowGroup aggregateRGPM;

  /* UM portion of the PM join alg */
  std::vector<std::shared_ptr<joiner::TupleJoiner> > tJoiners;
  std::vector<rowgroup::RowGroup> smallSideRGs;
  rowgroup::RowGroup largeSideRG;
  std::vector<std::vector<uint32_t> > smallSideKeys;
  boost::scoped_array<uint32_t> tlKeyLens;
  bool sendTupleJoinRowGroupData;
  uint32_t PMJoinerCount;

  /* OR hack */
  uint8_t bop;  // BOP_AND or BOP_OR
  bool forHJ;   // indicate if feeding a hashjoin, doJoin does not cover smallside

  /* Self-join */
  const JobInfo* fJobInfo;

  /* Functions & Expressions support */
  boost::shared_ptr<funcexp::FuncExpWrapper> fe1, fe2;
  rowgroup::RowGroup fe1Input, fe2Output;
  rowgroup::RowGroup joinFERG;

  mutable boost::scoped_array<rowgroup::RowGroup>
      primprocRG;  // the format of the data received from PrimProc
  uint32_t threadCount;

  unsigned fJoinerChunkSize;
  uint32_t dbRoot;
  bool hasSmallOuterJoin;
  uint32_t maxPmJoinResultCount = 1048576;

  uint32_t _priority;

  boost::uuids::uuid uuid;

  friend class CommandJL;
  friend class ColumnCommandJL;
  friend class PassThruCommandJL;
};

}  // namespace joblist
